/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshAuthorizationPolicyProtocolService;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH;

/**
 * This class is responsible for handling all of the RPC calls to the NameNode.
 * It is created, started, and stopped by {@link NameNode}.
 */
class NameNodeRpcServer implements NamenodeProtocols {

    private static final Log LOG = NameNode.LOG;
    private static final Log stateChangeLog = NameNode.stateChangeLog;
    private static final Log blockStateChangeLog = NameNode.blockStateChangeLog;

    // Dependencies from other parts of NN.
    protected final FSNamesystem namesystem;
    protected final NameNode nn;
    private final NameNodeMetrics metrics;

    private final boolean serviceAuthEnabled;

    /**
     * The RPC server that listens to requests from DataNodes
     */
    private final RPC.Server serviceRpcServer;
    private final InetSocketAddress serviceRPCAddress;

    /**
     * The RPC server that listens to requests from clients
     */
    protected final RPC.Server clientRpcServer;
    protected final InetSocketAddress clientRpcAddress;

    private final String minimumDataNodeVersion;

    public NameNodeRpcServer(Configuration conf, NameNode nn)
            throws IOException {
        this.nn = nn;
        this.namesystem = nn.getNamesystem();
        this.metrics = NameNode.getNameNodeMetrics();

        int handlerCount =
                conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
                        DFS_NAMENODE_HANDLER_COUNT_DEFAULT);

        // 设置序列化引擎
        RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
                ProtobufRpcEngine.class);

        //client和NameNode之间通信,需要调用的接口
        ClientNamenodeProtocolServerSideTranslatorPB
                clientProtocolServerTranslator =
                new ClientNamenodeProtocolServerSideTranslatorPB(this);
        // 构造BlockingService对象
        // 这个对象用于将Server提取出的请求转到clientProtocolServerTranslator
        BlockingService clientNNPbService = ClientNamenodeProtocol.
                newReflectiveBlockingService(clientProtocolServerTranslator);

        //datanode和namenode之间进行通信,需要调用的接口
        DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
                new DatanodeProtocolServerSideTranslatorPB(this);
        BlockingService dnProtoPbService = DatanodeProtocolService
                .newReflectiveBlockingService(dnProtoPbTranslator);

        //不同namenode之间进行调用需要的接口
        NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
                new NamenodeProtocolServerSideTranslatorPB(this);
        BlockingService NNPbService = NamenodeProtocolService
                .newReflectiveBlockingService(namenodeProtocolXlator);

        RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
                new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
        BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
                .newReflectiveBlockingService(refreshAuthPolicyXlator);

        RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
                new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
        BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
                .newReflectiveBlockingService(refreshUserMappingXlator);

        RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator =
                new RefreshCallQueueProtocolServerSideTranslatorPB(this);
        BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
                .newReflectiveBlockingService(refreshCallQueueXlator);

        GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
                new GenericRefreshProtocolServerSideTranslatorPB(this);
        BlockingService genericRefreshService = GenericRefreshProtocolService
                .newReflectiveBlockingService(genericRefreshXlator);

        GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
                new GetUserMappingsProtocolServerSideTranslatorPB(this);
        BlockingService getUserMappingService = GetUserMappingsProtocolService
                .newReflectiveBlockingService(getUserMappingXlator);

        HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
                new HAServiceProtocolServerSideTranslatorPB(this);
        BlockingService haPbService = HAServiceProtocolService
                .newReflectiveBlockingService(haServiceProtocolXlator);

        TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
                new TraceAdminProtocolServerSideTranslatorPB(this);
        BlockingService traceAdminService = TraceAdminService
                .newReflectiveBlockingService(traceAdminXlator);

        WritableRpcEngine.ensureInitialized();

        InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
        if (serviceRpcAddr != null) {
            String bindHost = nn.getServiceRpcServerBindHost(conf);
            if (bindHost == null) {
                bindHost = serviceRpcAddr.getHostName();
            }
            LOG.info("Service RPC server is binding to " + bindHost + ":" +
                    serviceRpcAddr.getPort());

            int serviceHandlerCount =
                    conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                            DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
            // 用于响应来自Datanode的Rpc请求
            this.serviceRpcServer = new RPC.Builder(conf)
                    .setProtocol(
                            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
                    .setInstance(clientNNPbService)
                    .setBindAddress(bindHost)
                    .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
                    .setVerbose(false)
                    .setSecretManager(namesystem.getDelegationTokenSecretManager())
                    .build();

            // Add all the RPC protocols that the namenode implements
            DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
                    serviceRpcServer);
            DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
                    serviceRpcServer);
            DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
                    serviceRpcServer);
            DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
                    refreshAuthService, serviceRpcServer);
            DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
                    refreshUserMappingService, serviceRpcServer);
            // We support Refreshing call queue here in case the client RPC queue is full
            DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
                    refreshCallQueueService, serviceRpcServer);
            DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
                    genericRefreshService, serviceRpcServer);
            DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
                    getUserMappingService, serviceRpcServer);
            DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
                    traceAdminService, serviceRpcServer);

            // Update the address with the correct port
            InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
            serviceRPCAddress = new InetSocketAddress(
                    serviceRpcAddr.getHostName(), listenAddr.getPort());
            nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
        } else {
            serviceRpcServer = null;
            serviceRPCAddress = null;
        }
        InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
        String bindHost = nn.getRpcServerBindHost(conf);
        if (bindHost == null) {
            bindHost = rpcAddr.getHostName();
        }
        LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());

        // clientRpcServer 用于响应来自Datanode的Rpc请求
        this.clientRpcServer = new RPC.Builder(conf)
                .setProtocol(
                        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
                .setInstance(clientNNPbService).setBindAddress(bindHost)
                .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
                .setVerbose(false)
                .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();

        // Add all the RPC protocols that the namenode implements
        DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
                clientRpcServer);
        DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
                clientRpcServer);
        DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
                clientRpcServer);
        DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
                refreshAuthService, clientRpcServer);
        DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
                refreshUserMappingService, clientRpcServer);
        DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
                refreshCallQueueService, clientRpcServer);
        DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
                genericRefreshService, clientRpcServer);
        DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
                getUserMappingService, clientRpcServer);
        DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
                traceAdminService, clientRpcServer);

        // set service-level authorization security policy
        if (serviceAuthEnabled =
                conf.getBoolean(
                        CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
            clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
            if (serviceRpcServer != null) {
                serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
            }
        }

        // The rpc-server port can be ephemeral... ensure we have the correct info
        InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
        clientRpcAddress = new InetSocketAddress(
                rpcAddr.getHostName(), listenAddr.getPort());
        nn.setRpcServerAddress(conf, clientRpcAddress);

        minimumDataNodeVersion = conf.get(
                DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
                DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);

        // Set terse exception whose stack trace won't be logged
        this.clientRpcServer.addTerseExceptions(SafeModeException.class,
                FileNotFoundException.class,
                HadoopIllegalArgumentException.class,
                FileAlreadyExistsException.class,
                InvalidPathException.class,
                ParentNotDirectoryException.class,
                UnresolvedLinkException.class,
                AlreadyBeingCreatedException.class,
                QuotaExceededException.class,
                RecoveryInProgressException.class,
                AccessControlException.class,
                InvalidToken.class,
                LeaseExpiredException.class,
                NSQuotaExceededException.class,
                DSQuotaExceededException.class,
                AclException.class,
                FSLimitException.PathComponentTooLongException.class,
                FSLimitException.MaxDirectoryItemsExceededException.class,
                UnresolvedPathException.class);
    }

    /**
     * Allow access to the client RPC server for testing
     */
    @VisibleForTesting
    RPC.Server getClientRpcServer() {
        return clientRpcServer;
    }

    /**
     * Allow access to the service RPC server for testing
     */
    @VisibleForTesting
    RPC.Server getServiceRpcServer() {
        return serviceRpcServer;
    }

    /**
     * Start client and service RPC servers.
     */
    void start() {
        clientRpcServer.start();
        if (serviceRpcServer != null) {
            serviceRpcServer.start();
        }
    }

    /**
     * Wait until the RPC servers have shutdown.
     */
    void join() throws InterruptedException {
        clientRpcServer.join();
        if (serviceRpcServer != null) {
            serviceRpcServer.join();
        }
    }

    /**
     * Stop client and service RPC servers.
     */
    void stop() {
        if (clientRpcServer != null) {
            clientRpcServer.stop();
        }
        if (serviceRpcServer != null) {
            serviceRpcServer.stop();
        }
    }

    InetSocketAddress getServiceRpcAddress() {
        return serviceRPCAddress;
    }

    InetSocketAddress getRpcAddress() {
        return clientRpcAddress;
    }

    private static UserGroupInformation getRemoteUser() throws IOException {
        return NameNode.getRemoteUser();
    }


    /////////////////////////////////////////////////////
    // NamenodeProtocol
    /////////////////////////////////////////////////////
    @Override // NamenodeProtocol
    public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
            throws IOException {
        if (size <= 0) {
            throw new IllegalArgumentException(
                    "Unexpected not positive size: " + size);
        }
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return namesystem.getBlockManager().getBlocks(datanode, size);
    }

    @Override // NamenodeProtocol
    public ExportedBlockKeys getBlockKeys() throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return namesystem.getBlockManager().getBlockKeys();
    }

    @Override // NamenodeProtocol
    public void errorReport(NamenodeRegistration registration,
                            int errorCode,
                            String msg) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.UNCHECKED);
        namesystem.checkSuperuserPrivilege();
        verifyRequest(registration);
        LOG.info("Error report from " + registration + ": " + msg);
        if (errorCode == FATAL) {
            namesystem.releaseBackupNode(registration);
        }
    }

    @Override // NamenodeProtocol
    public NamenodeRegistration registerSubordinateNamenode(
            NamenodeRegistration registration) throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        verifyLayoutVersion(registration.getVersion());
        NamenodeRegistration myRegistration = nn.setRegistration();
        namesystem.registerBackupNode(registration, myRegistration);
        return myRegistration;
    }

    @Override // NamenodeProtocol
    public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
            throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        verifyRequest(registration);
        if (!nn.isRole(NamenodeRole.NAMENODE))
            throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
        return namesystem.startCheckpoint(registration, nn.setRegistration());
    }

    @Override // NamenodeProtocol
    public void endCheckpoint(NamenodeRegistration registration,
                              CheckpointSignature sig) throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        namesystem.endCheckpoint(registration, sig);
    }

    @Override // ClientProtocol
    public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
            throws IOException {
        checkNNStartup();
        return namesystem.getDelegationToken(renewer);
    }

    @Override // ClientProtocol
    public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
            throws InvalidToken, IOException {
        checkNNStartup();
        return namesystem.renewDelegationToken(token);
    }

    @Override // ClientProtocol
    public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
            throws IOException {
        checkNNStartup();
        namesystem.cancelDelegationToken(token);
    }

    /**
     * 获取指定范围内所有数据块的位置信息
     * 每个数据块副本所在Datanode信息
     */
    @Override // ClientProtocol
    public LocatedBlocks getBlockLocations(String src,
                                           long offset,
                                           long length)
            throws IOException {
        checkNNStartup();
        metrics.incrGetBlockLocations();
        return namesystem.getBlockLocations(getClientMachine(),
                src, offset, length);
    }

    @Override // ClientProtocol
    public FsServerDefaults getServerDefaults() throws IOException {
        checkNNStartup();
        return namesystem.getServerDefaults();
    }

    /**
     * 在文件目录树中创建一个新的空文件,创建的路径由src指定,这个空文件创建后对其他的客户端是"可读的"
     * 但是这些客户端不能删除,重命名,或者移动这个文件,知道这个文件被关闭或者租约到期
     * <p>
     * 客户端写一个新文件的时候,会首先调用create()方法在文件系统目录树中创建一个空文件,然后调用addBlock()
     * 方法获取存储文件数据的数据块的位置信息,最后客户端就可以根据位置信息建立数据流管道,向数据节点写入数据
     */
    @Override // ClientProtocol
    public HdfsFileStatus create(String src, FsPermission masked,
                                 String clientName, EnumSetWritable<CreateFlag> flag,
                                 boolean createParent, short replication, long blockSize,
                                 CryptoProtocolVersion[] supportedVersions)
            throws IOException {
        checkNNStartup();
        String clientMachine = getClientMachine();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.create: file "
                    + src + " for " + clientName + " at " + clientMachine);
        }
        if (!checkPathLength(src)) {
            throw new IOException("create: Pathname too long.  Limit "
                    + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        // FsNameSystem.startFile()完成文件目录树中新增一个文件的操作,包括去写那个edits log
        HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
                        getRemoteUser().getShortUserName(), null, masked),
                clientName, clientMachine, flag.get(), createParent, replication,
                blockSize, supportedVersions);
        metrics.incrFilesCreated();
        metrics.incrCreateFileOps();
        return fileStatus;
    }

    @Override // ClientProtocol
    public LocatedBlock append(String src, String clientName)
            throws IOException {
        checkNNStartup();
        String clientMachine = getClientMachine();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.append: file "
                    + src + " for " + clientName + " at " + clientMachine);
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
        metrics.incrFilesAppended();
        return info;
    }

    @Override // ClientProtocol
    public boolean recoverLease(String src, String clientName) throws IOException {
        checkNNStartup();
        String clientMachine = getClientMachine();
        return namesystem.recoverLease(src, clientName, clientMachine);
    }

    @Override // ClientProtocol
    public boolean setReplication(String src, short replication)
            throws IOException {
        checkNNStartup();
        return namesystem.setReplication(src, replication);
    }

    @Override
    public void setStoragePolicy(String src, String policyName)
            throws IOException {
        checkNNStartup();
        namesystem.setStoragePolicy(src, policyName);
    }

    @Override
    public BlockStoragePolicy[] getStoragePolicies() throws IOException {
        checkNNStartup();
        return namesystem.getStoragePolicies();
    }

    /**
     * 修改文件权限
     */
    @Override // ClientProtocol
    public void setPermission(String src, FsPermission permissions)
            throws IOException {
        checkNNStartup();
        namesystem.setPermission(src, permissions);
    }

    /**
     * 修改文件组
     */
    @Override // ClientProtocol
    public void setOwner(String src, String username, String groupname)
            throws IOException {
        checkNNStartup();
        namesystem.setOwner(src, username, groupname);
    }

    /**
     * 机架感知的设计思想,一个机架放2个datanode,另外一个机架放一个datanode
     * 为什么第一个机架要放2个block呢?因为hdfs客户端在上传block的时候,是先给第一个datanode上传block,第一个机架里的
     * 第一个datanode接收到了block以后,会复制后发送给同一个机架里的另外一个datanode,这个保证通风机架传输,性能较高.
     * 为什么要在另外一个机架也放一个datanode呢?如果第一个机架宕机了,那么第二个机架式没问题的,还有一个datanode可以提供副本,保证数据的高容错,高可用
     * <p>
     * 向指定文件添加一个新的数据块,并获取存储这个数据块副本的所有数据节点的位置信息
     * NameNode在分配新的数据块时,会顺便提交上一个数据块,这里的previous参数就是上一个数据块的引用
     * excludedNodes则是数据节点的黑名单,保存了客户端无法连接的一些数据节点
     * favoredNodes则是客户端希望的保存数据块副本的数据节点列表
     * <p>
     * 客户端调用addBlock()方法获取新的数据块的位置信息后,会建立这些数据节点的数据管道流
     */
    @Override
    public LocatedBlock addBlock(String src, String clientName,
                                 ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
                                 String[] favoredNodes)
            throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
                    + " fileId=" + fileId + " for " + clientName);
        }
        Set<Node> excludedNodesSet = null;
        if (excludedNodes != null) {
            excludedNodesSet = new HashSet<Node>(excludedNodes.length);
            for (Node node : excludedNodes) {
                excludedNodesSet.add(node);
            }
        }
        List<String> favoredNodesList = (favoredNodes == null) ? null
                : Arrays.asList(favoredNodes);
        // 核心代码
        LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
                clientName, previous, excludedNodesSet, favoredNodesList);
        if (locatedBlock != null)
            metrics.incrAddBlockOps();
        return locatedBlock;
    }

    /**
     * 如果客户端已经成功建立了数据管道流,在客户端写某个数据块时,存储这个数据块副本的某个数据节点出现了错误如何处理?
     * 客户端会调用这个方法向namenode申请一个新的datanode来替代出现故障的datanode,然后 客户端会调用updateBlockForPipeline()
     * 方法向namenode申请为这个数据块分配新的时间戳,这样故障节点上的没能写完整的数据块的时间戳就会过期,在后续的块汇报操作中会被删除
     * 最后客户端就可以使用新的时间戳建立新的数据流管道,来执行对数据块的写操作了.
     * 数据流管道建立成功后,客户端还需要调用updatePipeline()方法更新Namenode中当前数据块的数据流管道信息
     */
    @Override // ClientProtocol
    public LocatedBlock getAdditionalDatanode(final String src,
                                              final long fileId, final ExtendedBlock blk,
                                              final DatanodeInfo[] existings, final String[] existingStorageIDs,
                                              final DatanodeInfo[] excludes,
                                              final int numAdditionalNodes, final String clientName
    ) throws IOException {
        checkNNStartup();
        if (LOG.isDebugEnabled()) {
            LOG.debug("getAdditionalDatanode: src=" + src
                    + ", fileId=" + fileId
                    + ", blk=" + blk
                    + ", existings=" + Arrays.asList(existings)
                    + ", excludes=" + Arrays.asList(excludes)
                    + ", numAdditionalNodes=" + numAdditionalNodes
                    + ", clientName=" + clientName);
        }

        metrics.incrGetAdditionalDatanodeOps();

        Set<Node> excludeSet = null;
        if (excludes != null) {
            excludeSet = new HashSet<Node>(excludes.length);
            for (Node node : excludes) {
                excludeSet.add(node);
            }
        }
        return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
                existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
    }

    /**
     * The client needs to give up on the block.
     * <p>
     * 放弃一个新申请的数据块,用于处理客户端建立数据流管道时数据节点出现故障的情况
     * 当客户端获取了一个新申请的数据块,发现无法建立连接时,会调用这个方法放弃这个数据块
     * 之后客户单会再次调用addBlock()方法获取新的数据块
     */
    @Override // ClientProtocol
    public void abandonBlock(ExtendedBlock b, long fileId, String src,
                             String holder) throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
                    + b + " of file " + src);
        }
        if (!namesystem.abandonBlock(b, fileId, src, holder)) {
            throw new IOException("Cannot abandon block during write to " + src);
        }
    }

    /**
     * 当客户端完成整个文件的写入操作后,调用这个方法
     * 该文件的所有数据块至少有一个有效副本时,返回true
     */
    @Override // ClientProtocol
    public boolean complete(String src, String clientName,
                            ExtendedBlock last, long fileId)
            throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.complete: "
                    + src + " fileId=" + fileId + " for " + clientName);
        }
        return namesystem.completeFile(src, clientName, last, fileId);
    }

    /**
     * The client has detected an error on the specified located blocks
     * and is reporting them to the server.  For now, the namenode will
     * mark the block as corrupt.  In the future we might
     * check the blocks are actually corrupt.
     * <p>
     * 向nameNode汇报错误的数据块
     * 当客户端从数据节点读取数据块且发现数据块的校验和并不正确,就会汇报这个错误的数据块信息
     * <p>
     * DataNode会在一下三种情况下调用这个方法:
     * 1:DataBlockScanner线程定期扫描数据节点上存储的数据块,发现数据块的校验出现错误
     * 2:数据流管道写数据时,DataNode接收一个新的数据块,进行数据块校验操作出现错误时
     * 3:进行数据块复制操作,Datanode读取本地存储的数据块时,发现本地数据块副本的长度小于namenode记录的长度,则认为该数据块无效
     */
    @Override // ClientProtocol, DatanodeProtocol
    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
        checkNNStartup();
        namesystem.reportBadBlocks(blocks);
    }

    @Override // ClientProtocol
    public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
            throws IOException {
        checkNNStartup();
        return namesystem.updateBlockForPipeline(block, clientName);
    }


    @Override // ClientProtocol
    public void updatePipeline(String clientName, ExtendedBlock oldBlock,
                               ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
            throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs);
    }

    /**
     * 用于在租期恢复操作时同步数据块的状态
     */
    @Override // DatanodeProtocol
    public void commitBlockSynchronization(ExtendedBlock block,
                                           long newgenerationstamp, long newlength,
                                           boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
                                           String[] newtargetstorages)
            throws IOException {
        checkNNStartup();
        namesystem.commitBlockSynchronization(block, newgenerationstamp,
                newlength, closeFile, deleteblock, newtargets, newtargetstorages);
    }

    @Override // ClientProtocol
    public long getPreferredBlockSize(String filename)
            throws IOException {
        checkNNStartup();
        return namesystem.getPreferredBlockSize(filename);
    }

    @Deprecated
    @Override // ClientProtocol
    public boolean rename(String src, String dst) throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
        }
        if (!checkPathLength(dst)) {
            throw new IOException("rename: Pathname too long.  Limit "
                    + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        boolean ret = namesystem.renameTo(src, dst);
        if (ret) {
            metrics.incrFilesRenamed();
        }
        return ret;
    }

    /**
     * 将两个已有文件拼成一个
     */
    @Override // ClientProtocol
    public void concat(String trg, String[] src) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.concat(trg, src);
    }

    /**
     * 更改文件/目录名称
     */
    @Override // ClientProtocol
    public void rename2(String src, String dst, Options.Rename... options)
            throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
        }
        if (!checkPathLength(dst)) {
            throw new IOException("rename: Pathname too long.  Limit "
                    + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.renameTo(src, dst, options);
        metrics.incrFilesRenamed();
    }

    /**
     * 从文件系统中删除指定文件/目录
     */
    @Override // ClientProtocol
    public boolean delete(String src, boolean recursive) throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
                    + ", recursive=" + recursive);
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        boolean ret = namesystem.delete(src, recursive);
        if (ret)
            metrics.incrDeleteFileOps();
        return ret;
    }

    /**
     * Check path length does not exceed maximum.  Returns true if
     * length and depth are okay.  Returns false if length is too long
     * or depth is too great.
     */
    private boolean checkPathLength(String src) {
        Path srcPath = new Path(src);
        return (src.length() <= MAX_PATH_LENGTH &&
                srcPath.depth() <= MAX_PATH_DEPTH);
    }

    /**
     * 元数据管理通过创建目录来学习
     */
    @Override // ClientProtocol
    public boolean mkdirs(String src, FsPermission masked, boolean createParent)
            throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
        }
        if (!checkPathLength(src)) {
            throw new IOException("mkdirs: Pathname too long.  Limit "
                    + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
        }
        return namesystem.mkdirs(src,
                new PermissionStatus(getRemoteUser().getShortUserName(),
                        null, masked), createParent);
    }

    /**
     * 在写数据的过程中,Client节点有可能在任意时刻发生故障,为了预防这种情况,对于任意一个Client打开的文件都需要定期更新租约
     * 如果NameNode长时间没有收到Client的租约更新消息,就会认为Client发生故障,这时就会触发一次租约恢复的操作,关闭文件并且
     * 同步所有数据节点上这个文件数据块的状态
     */
    @Override // ClientProtocol
    public void renewLease(String clientName) throws IOException {
        checkNNStartup();
        namesystem.renewLease(clientName);
    }

    /**
     * 读取一个指定目录下所有项目
     */
    @Override // ClientProtocol
    public DirectoryListing getListing(String src, byte[] startAfter,
                                       boolean needLocation) throws IOException {
        checkNNStartup();
        DirectoryListing files = namesystem.getListing(
                src, startAfter, needLocation);
        if (files != null) {
            metrics.incrGetListingOps();
            metrics.incrFilesInGetListingOps(files.getPartialListing().length);
        }
        return files;
    }

    @Override // ClientProtocol
    public HdfsFileStatus getFileInfo(String src) throws IOException {
        checkNNStartup();
        metrics.incrFileInfoOps();
        return namesystem.getFileInfo(src, true);
    }

    @Override // ClientProtocol
    public boolean isFileClosed(String src) throws IOException {
        checkNNStartup();
        return namesystem.isFileClosed(src);
    }

    @Override // ClientProtocol
    public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
        checkNNStartup();
        metrics.incrFileInfoOps();
        return namesystem.getFileInfo(src, false);
    }

    @Override // ClientProtocol
    public long[] getStats() throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.READ);
        return namesystem.getStats();
    }

    /**
     * 获取集群中存活的,死亡的或者所有的数据节点信息
     */
    @Override // ClientProtocol
    public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
            throws IOException {
        checkNNStartup();
        DatanodeInfo results[] = namesystem.datanodeReport(type);
        if (results == null) {
            throw new IOException("Failed to get datanode report for " + type
                    + " datanodes.");
        }
        return results;
    }

    @Override // ClientProtocol
    public DatanodeStorageReport[] getDatanodeStorageReport(
            DatanodeReportType type) throws IOException {
        checkNNStartup();
        final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
        if (reports == null) {
            throw new IOException("Failed to get datanode storage report for " + type
                    + " datanodes.");
        }
        return reports;
    }

    /**
     * 进入,离开安全模式,或者获取当前安全模式的状态
     */
    @Override // ClientProtocol
    public boolean setSafeMode(SafeModeAction action, boolean isChecked)
            throws IOException {
        checkNNStartup();
        OperationCategory opCategory = OperationCategory.UNCHECKED;
        if (isChecked) {
            if (action == SafeModeAction.SAFEMODE_GET) {
                opCategory = OperationCategory.READ;
            } else {
                opCategory = OperationCategory.WRITE;
            }
        }
        namesystem.checkOperation(opCategory);
        return namesystem.setSafeMode(action);
    }

    @Override // ClientProtocol
    public boolean restoreFailedStorage(String arg) throws IOException {
        checkNNStartup();
        return namesystem.restoreFailedStorage(arg);
    }

    /**
     * 将Namenode内存中的元数据保存至新的fsimage中,并且重置editslog
     * 这个操作必须处于安全模式中
     */
    @Override // ClientProtocol
    public void saveNamespace() throws IOException {
        checkNNStartup();
        namesystem.saveNamespace();
    }

    @Override // ClientProtocol
    public long rollEdits() throws AccessControlException, IOException {
        checkNNStartup();
        CheckpointSignature sig = namesystem.rollEditLog();
        return sig.getCurSegmentTxId();
    }

    /**
     * 触发namenode重新读取include/exclude文件
     */
    @Override // ClientProtocol
    public void refreshNodes() throws IOException {
        checkNNStartup();
        namesystem.refreshNodes();
    }

    @Override // NamenodeProtocol
    public long getTransactionID() throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.UNCHECKED);
        namesystem.checkSuperuserPrivilege();
        return namesystem.getFSImage().getLastAppliedOrWrittenTxId();
    }

    @Override // NamenodeProtocol
    public long getMostRecentCheckpointTxId() throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.UNCHECKED);
        namesystem.checkSuperuserPrivilege();
        return namesystem.getFSImage().getMostRecentCheckpointTxId();
    }

    @Override // NamenodeProtocol
    public CheckpointSignature rollEditLog() throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return namesystem.rollEditLog();
    }

    @Override // NamenodeProtocol
    public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
            throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.READ);
        namesystem.checkSuperuserPrivilege();
        return namesystem.getEditLog().getEditLogManifest(sinceTxId);
    }

    @Override // NamenodeProtocol
    public boolean isUpgradeFinalized() throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return namesystem.isUpgradeFinalized();
    }

    @Override // ClientProtocol
    public void finalizeUpgrade() throws IOException {
        checkNNStartup();
        namesystem.finalizeUpgrade();
    }

    @Override // ClientProtocol
    public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException {
        checkNNStartup();
        LOG.info("rollingUpgrade " + action);
        switch (action) {
            case QUERY:
                return namesystem.queryRollingUpgrade();
            case PREPARE:
                return namesystem.startRollingUpgrade();
            case FINALIZE:
                namesystem.finalizeRollingUpgrade();
                return null;
            default:
                throw new UnsupportedActionException(action + " is not yet supported.");
        }
    }

    @Override // ClientProtocol
    public void metaSave(String filename) throws IOException {
        checkNNStartup();
        namesystem.metaSave(filename);
    }

    @Override // ClientProtocol
    public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
            throws IOException {
        checkNNStartup();
        String[] cookieTab = new String[]{cookie};
        Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
                namesystem.listCorruptFileBlocks(path, cookieTab);

        String[] files = new String[fbs.size()];
        int i = 0;
        for (FSNamesystem.CorruptFileBlockInfo fb : fbs) {
            files[i++] = fb.path;
        }
        return new CorruptFileBlocks(files, cookieTab[0]);
    }

    /**
     * Tell all datanodes to use a new, non-persistent bandwidth value for
     * dfs.datanode.balance.bandwidthPerSec.
     *
     * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
     * @throws IOException
     */
    @Override // ClientProtocol
    public void setBalancerBandwidth(long bandwidth) throws IOException {
        checkNNStartup();
        namesystem.setBalancerBandwidth(bandwidth);
    }

    @Override // ClientProtocol
    public ContentSummary getContentSummary(String path) throws IOException {
        checkNNStartup();
        return namesystem.getContentSummary(path);
    }

    @Override // ClientProtocol
    public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
            throws IOException {
        checkNNStartup();
        namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
    }

    @Override // ClientProtocol
    public void fsync(String src, long fileId, String clientName,
                      long lastBlockLength)
            throws IOException {
        checkNNStartup();
        namesystem.fsync(src, fileId, clientName, lastBlockLength);
    }

    /**
     * 修改文件修改时间,访问时间
     */
    @Override // ClientProtocol
    public void setTimes(String src, long mtime, long atime)
            throws IOException {
        checkNNStartup();
        namesystem.setTimes(src, mtime, atime);
    }

    @Override // ClientProtocol
    public void createSymlink(String target, String link, FsPermission dirPerms,
                              boolean createParent) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        metrics.incrCreateSymlinkOps();
        /* We enforce the MAX_PATH_LENGTH limit even though a symlink target
         * URI may refer to a non-HDFS file system.
         */
        if (!checkPathLength(link)) {
            throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
                    " character limit");

        }
        if ("".equals(target)) {
            throw new IOException("Invalid symlink target");
        }
        final UserGroupInformation ugi = getRemoteUser();
        namesystem.createSymlink(target, link,
                new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
    }

    @Override // ClientProtocol
    public String getLinkTarget(String path) throws IOException {
        checkNNStartup();
        metrics.incrGetLinkTargetOps();
        HdfsFileStatus stat = null;
        try {
            stat = namesystem.getFileInfo(path, false);
        } catch (UnresolvedPathException e) {
            return e.getResolvedPath().toString();
        } catch (UnresolvedLinkException e) {
            // The NameNode should only throw an UnresolvedPathException
            throw new AssertionError("UnresolvedLinkException thrown");
        }
        if (stat == null) {
            throw new FileNotFoundException("File does not exist: " + path);
        } else if (!stat.isSymlink()) {
            throw new IOException("Path " + path + " is not a symbolic link");
        }
        return stat.getSymlink();
    }

    /**
     * Datanode向nameNode注册
     */
    @Override // DatanodeProtocol
    public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
            throws IOException {
        checkNNStartup();
        verifySoftwareVersion(nodeReg);
        namesystem.registerDatanode(nodeReg);
        return nodeReg;
    }

    /**
     * Datanode向namenode发送心跳,默认3秒一次
     * 在HA架构下,DataNode需要向Active Node和Standby Node发送心跳,不过只有Active Node才能向Datanode下发指令
     */
    @Override // DatanodeProtocol
    public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
                                           StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
                                           int xmitsInProgress, int xceiverCount,
                                           int failedVolumes) throws IOException {
        checkNNStartup();
        verifyRequest(nodeReg);
        return namesystem.handleHeartbeat(nodeReg, report,
                dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
                failedVolumes);
    }

    /**
     * DataNode向namenode上报它管理的所有数据块信息
     * Datanode启动以及指定间隔时执行一次,默认6小时执行一次
     */
    @Override // DatanodeProtocol
    public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
                                       String poolId, StorageBlockReport[] reports,
                                       BlockReportContext context) throws IOException {
        checkNNStartup();
        verifyRequest(nodeReg);
        if (blockStateChangeLog.isDebugEnabled()) {
            blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: "
                    + "from " + nodeReg + ", reports.length=" + reports.length);
        }
        final BlockManager bm = namesystem.getBlockManager();
        boolean noStaleStorages = false;
        for (int r = 0; r < reports.length; r++) {
            final BlockListAsLongs blocks =
                    new BlockListAsLongs(reports[r].getBlocks());
            //
            // BlockManager.processReport accumulates information of prior calls
            // for the same node and storage, so the value returned by the last
            // call of this loop is the final updated value for noStaleStorage.
            //
            noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
                    blocks, context, (r == reports.length - 1));
            metrics.incrStorageBlockReportOps();
        }

        if (nn.getFSImage().isUpgradeFinalized() &&
                !namesystem.isRollingUpgrade() &&
                !nn.isStandbyState() &&
                noStaleStorages) {
            return new FinalizeCommand(poolId);
        }

        return null;
    }

    /**
     * 和blockReport()方法完全一致,只不过汇报的是当前Datanode上缓存的所有数据块
     */
    @Override
    public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
                                       String poolId, List<Long> blockIds) throws IOException {
        checkNNStartup();
        verifyRequest(nodeReg);
        if (blockStateChangeLog.isDebugEnabled()) {
            blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
                    + "from " + nodeReg + " " + blockIds.size() + " blocks");
        }
        namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
        return null;
    }

    /**
     * 接收datanode上报过来的数据,datanode接收到了一个block
     * <p>
     * Datanode默认每隔5分钟调用这个方法,向namenode汇报自己新接受的数据块或者删除的数据块
     */
    @Override
    public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
                                        StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
        checkNNStartup();
        verifyRequest(nodeReg);
        metrics.incrBlockReceivedAndDeletedOps();
        if (blockStateChangeLog.isDebugEnabled()) {
            blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
                    + "from " + nodeReg + " " + receivedAndDeletedBlocks.length
                    + " blocks.");
        }
        for (StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
            namesystem.processIncrementalBlockReport(nodeReg, r);
        }
    }

    /**
     * 用于向namenode上报运行过程中发生的一些状况,如磁盘不可用
     */
    @Override // DatanodeProtocol
    public void errorReport(DatanodeRegistration nodeReg,
                            int errorCode, String msg) throws IOException {
        checkNNStartup();
        String dnName =
                (nodeReg == null) ? "Unknown DataNode" : nodeReg.toString();

        if (errorCode == DatanodeProtocol.NOTIFY) {
            LOG.info("Error report from " + dnName + ": " + msg);
            return;
        }
        verifyRequest(nodeReg);

        if (errorCode == DatanodeProtocol.DISK_ERROR) {
            LOG.warn("Disk error on " + dnName + ": " + msg);
        } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
            LOG.warn("Fatal disk error on " + dnName + ": " + msg);
            namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);
        } else {
            LOG.info("Error report from " + dnName + ": " + msg);
        }
    }

    /**
     * Datanode启动会调用这个方法于namenode握手,这个方法返回一个NamespaceInfo对象
     * NamespaceInfo对象封装了当前HDFS集群的命名空间信息
     * DataNode获取到NamespaceInfo对象后,会比较当前Datanode的HDFS版本号和Namenode的HDFS版本号
     * 如果Datanode版本和Namenode版本不能协同工作,则抛出异常,Datanode不能注册到NameNode上
     */
    @Override // DatanodeProtocol, NamenodeProtocol
    public NamespaceInfo versionRequest() throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return namesystem.getNamespaceInfo();
    }

    /**
     * Verifies the given registration.
     *
     * @param nodeReg node registration
     * @throws UnregisteredNodeException if the registration is invalid
     */
    private void verifyRequest(NodeRegistration nodeReg) throws IOException {
        // verify registration ID
        final String id = nodeReg.getRegistrationID();
        final String expectedID = namesystem.getRegistrationID();
        if (!expectedID.equals(id)) {
            LOG.warn("Registration IDs mismatched: the "
                    + nodeReg.getClass().getSimpleName() + " ID is " + id
                    + " but the expected ID is " + expectedID);
            throw new UnregisteredNodeException(nodeReg);
        }
    }


    @Override // RefreshAuthorizationPolicyProtocol
    public void refreshServiceAcl() throws IOException {
        checkNNStartup();
        if (!serviceAuthEnabled) {
            throw new AuthorizationException("Service Level Authorization not enabled!");
        }

        this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
        if (this.serviceRpcServer != null) {
            this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
        }
    }

    @Override // RefreshAuthorizationPolicyProtocol
    public void refreshUserToGroupsMappings() throws IOException {
        LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
                getRemoteUser().getShortUserName());
        Groups.getUserToGroupsMappingService().refresh();
    }

    @Override // RefreshAuthorizationPolicyProtocol
    public void refreshSuperUserGroupsConfiguration() {
        LOG.info("Refreshing SuperUser proxy group mapping list ");

        ProxyUsers.refreshSuperUserGroupsConfiguration();
    }

    @Override // RefreshCallQueueProtocol
    public void refreshCallQueue() {
        LOG.info("Refreshing call queue.");

        Configuration conf = new Configuration();
        clientRpcServer.refreshCallQueue(conf);
        if (this.serviceRpcServer != null) {
            serviceRpcServer.refreshCallQueue(conf);
        }
    }

    @Override // GenericRefreshProtocol
    public Collection<RefreshResponse> refresh(String identifier, String[] args) {
        // Let the registry handle as needed
        return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
    }

    @Override // GetUserMappingsProtocol
    public String[] getGroupsForUser(String user) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting groups for user " + user);
        }
        return UserGroupInformation.createRemoteUser(user).getGroupNames();
    }

    @Override // HAServiceProtocol
    public synchronized void monitorHealth() throws HealthCheckFailedException,
            AccessControlException, IOException {
        checkNNStartup();
        nn.monitorHealth();
    }

    @Override // HAServiceProtocol
    public synchronized void transitionToActive(StateChangeRequestInfo req)
            throws ServiceFailedException, AccessControlException, IOException {
        checkNNStartup();
        nn.checkHaStateChange(req);
        nn.transitionToActive();
    }

    @Override // HAServiceProtocol
    public synchronized void transitionToStandby(StateChangeRequestInfo req)
            throws ServiceFailedException, AccessControlException, IOException {
        checkNNStartup();
        nn.checkHaStateChange(req);
        nn.transitionToStandby();
    }

    @Override // HAServiceProtocol
    public synchronized HAServiceStatus getServiceStatus()
            throws AccessControlException, ServiceFailedException, IOException {
        checkNNStartup();
        return nn.getServiceStatus();
    }

    /**
     * Verify version.
     *
     * @param version layout version
     * @throws IOException on layout version mismatch
     */
    void verifyLayoutVersion(int version) throws IOException {
        if (version != HdfsConstants.NAMENODE_LAYOUT_VERSION)
            throw new IncorrectVersionException(
                    HdfsConstants.NAMENODE_LAYOUT_VERSION, version, "data node");
    }

    private void verifySoftwareVersion(DatanodeRegistration dnReg)
            throws IncorrectVersionException {
        String dnVersion = dnReg.getSoftwareVersion();
        if (VersionUtil.compareVersions(dnVersion, minimumDataNodeVersion) < 0) {
            IncorrectVersionException ive = new IncorrectVersionException(
                    minimumDataNodeVersion, dnVersion, "DataNode", "NameNode");
            LOG.warn(ive.getMessage() + " DN: " + dnReg);
            throw ive;
        }
        String nnVersion = VersionInfo.getVersion();
        if (!dnVersion.equals(nnVersion)) {
            String messagePrefix = "Reported DataNode version '" + dnVersion +
                    "' of DN " + dnReg + " does not match NameNode version '" +
                    nnVersion + "'";
            long nnCTime = nn.getFSImage().getStorage().getCTime();
            long dnCTime = dnReg.getStorageInfo().getCTime();
            if (nnCTime != dnCTime) {
                IncorrectVersionException ive = new IncorrectVersionException(
                        messagePrefix + " and CTime of DN ('" + dnCTime +
                                "') does not match CTime of NN ('" + nnCTime + "')");
                LOG.warn(ive);
                throw ive;
            } else {
                LOG.info(messagePrefix +
                        ". Note: This is normal during a rolling upgrade.");
            }
        }
    }

    private static String getClientMachine() {
        String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
        if (clientMachine == null) { //not a web client
            clientMachine = Server.getRemoteAddress();
        }
        if (clientMachine == null) { //not a RPC client
            clientMachine = "";
        }
        return clientMachine;
    }

    @Override
    public DataEncryptionKey getDataEncryptionKey() throws IOException {
        checkNNStartup();
        return namesystem.getBlockManager().generateDataEncryptionKey();
    }

    /**
     * 创建快照
     */
    @Override
    public String createSnapshot(String snapshotRoot, String snapshotName)
            throws IOException {
        checkNNStartup();
        if (!checkPathLength(snapshotRoot)) {
            throw new IOException("createSnapshot: Pathname too long.  Limit "
                    + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        metrics.incrCreateSnapshotOps();
        return namesystem.createSnapshot(snapshotRoot, snapshotName);
    }

    /**
     * 删除快照
     */
    @Override
    public void deleteSnapshot(String snapshotRoot, String snapshotName)
            throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        metrics.incrDeleteSnapshotOps();
        namesystem.deleteSnapshot(snapshotRoot, snapshotName);
    }

    /**
     * 开启指定目录的快照功能,一个目录必须在开启快照功能之后才可以添加快照
     */
    @Override
    public void allowSnapshot(String snapshotRoot) throws IOException {
        checkNNStartup();
        metrics.incrAllowSnapshotOps();
        namesystem.allowSnapshot(snapshotRoot);
    }

    /**
     * 关闭指定目录的快照功能
     */
    @Override
    public void disallowSnapshot(String snapshot) throws IOException {
        checkNNStartup();
        metrics.incrDisAllowSnapshotOps();
        namesystem.disallowSnapshot(snapshot);
    }

    /**
     * 重命名快照
     */
    @Override
    public void renameSnapshot(String snapshotRoot, String snapshotOldName,
                               String snapshotNewName) throws IOException {
        checkNNStartup();
        if (snapshotNewName == null || snapshotNewName.isEmpty()) {
            throw new IOException("The new snapshot name is null or empty.");
        }
        namesystem.checkOperation(OperationCategory.WRITE);
        metrics.incrRenameSnapshotOps();
        namesystem.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
    }

    @Override // Client Protocol
    public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
            throws IOException {
        checkNNStartup();
        SnapshottableDirectoryStatus[] status = namesystem
                .getSnapshottableDirListing();
        metrics.incrListSnapshottableDirOps();
        return status;
    }

    /**
     * 获取两个快照间的不同
     */
    @Override
    public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
                                                    String earlierSnapshotName, String laterSnapshotName) throws IOException {
        checkNNStartup();
        SnapshotDiffReport report = namesystem.getSnapshotDiffReport(snapshotRoot,
                earlierSnapshotName, laterSnapshotName);
        metrics.incrSnapshotDiffReportOps();
        return report;
    }

    /**
     * 添加一个缓存
     */
    @Override
    public long addCacheDirective(
            CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        return namesystem.addCacheDirective(path, flags);
    }

    /**
     * 修改缓存
     */
    @Override
    public void modifyCacheDirective(
            CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.modifyCacheDirective(directive, flags);
    }

    /**
     * 删除缓存
     */
    @Override
    public void removeCacheDirective(long id) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.removeCacheDirective(id);
    }

    /**
     * 列出指定路径下的所有缓存
     */
    @Override
    public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
                                                                   CacheDirectiveInfo filter) throws IOException {
        checkNNStartup();
        if (filter == null) {
            filter = new CacheDirectiveInfo.Builder().build();
        }
        return namesystem.listCacheDirectives(prevId, filter);
    }

    /**
     * 添加一个缓冲池
     */
    @Override
    public void addCachePool(CachePoolInfo info) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.addCachePool(info);
    }

    /**
     * 修改已有缓冲池元数据
     */
    @Override
    public void modifyCachePool(CachePoolInfo info) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.modifyCachePool(info);
    }

    /**
     * 删除缓冲池
     */
    @Override
    public void removeCachePool(String cachePoolName) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.removeCachePool(cachePoolName);
    }

    /**
     * 列出已有缓冲池的信息
     */
    @Override
    public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
            throws IOException {
        checkNNStartup();
        return namesystem.listCachePools(prevKey != null ? prevKey : "");
    }

    @Override // ClientProtocol
    public void modifyAclEntries(String src, List<AclEntry> aclSpec)
            throws IOException {
        checkNNStartup();
        namesystem.modifyAclEntries(src, aclSpec);
    }

    @Override // ClienProtocol
    public void removeAclEntries(String src, List<AclEntry> aclSpec)
            throws IOException {
        checkNNStartup();
        namesystem.removeAclEntries(src, aclSpec);
    }

    @Override // ClientProtocol
    public void removeDefaultAcl(String src) throws IOException {
        checkNNStartup();
        namesystem.removeDefaultAcl(src);
    }

    @Override // ClientProtocol
    public void removeAcl(String src) throws IOException {
        checkNNStartup();
        namesystem.removeAcl(src);
    }

    @Override // ClientProtocol
    public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
        checkNNStartup();
        namesystem.setAcl(src, aclSpec);
    }

    @Override // ClientProtocol
    public AclStatus getAclStatus(String src) throws IOException {
        checkNNStartup();
        return namesystem.getAclStatus(src);
    }

    @Override // ClientProtocol
    public void createEncryptionZone(String src, String keyName)
            throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.createEncryptionZone(src, keyName);
    }

    @Override // ClientProtocol
    public EncryptionZone getEZForPath(String src)
            throws IOException {
        checkNNStartup();
        return namesystem.getEZForPath(src);
    }

    @Override // ClientProtocol
    public BatchedEntries<EncryptionZone> listEncryptionZones(
            long prevId) throws IOException {
        checkNNStartup();
        return namesystem.listEncryptionZones(prevId);
    }

    @Override // ClientProtocol
    public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
            throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.setXAttr(src, xAttr, flag);
    }

    @Override // ClientProtocol
    public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
            throws IOException {
        checkNNStartup();
        return namesystem.getXAttrs(src, xAttrs);
    }

    @Override // ClientProtocol
    public List<XAttr> listXAttrs(String src) throws IOException {
        checkNNStartup();
        return namesystem.listXAttrs(src);
    }

    @Override // ClientProtocol
    public void removeXAttr(String src, XAttr xAttr) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.WRITE);
        namesystem.removeXAttr(src, xAttr);
    }

    private void checkNNStartup() throws IOException {
        if (!this.nn.isStarted()) {
            throw new IOException(this.nn.getRole() + " still not started");
        }
    }

    @Override // ClientProtocol
    public void checkAccess(String path, FsAction mode) throws IOException {
        checkNNStartup();
        namesystem.checkAccess(path, mode);
    }

    @Override // ClientProtocol
    public long getCurrentEditLogTxid() throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.READ); // only active
        namesystem.checkSuperuserPrivilege();
        // if it's not yet open for write, we may be in the process of transitioning
        // from standby to active and may not yet know what the latest committed
        // txid is
        return namesystem.getEditLog().isOpenForWrite() ?
                namesystem.getEditLog().getLastWrittenTxId() : -1;
    }

    private static FSEditLogOp readOp(EditLogInputStream elis)
            throws IOException {
        try {
            return elis.readOp();
            // we can get the below two exceptions if a segment is deleted
            // (because we have accumulated too many edits) or (for the local journal/
            // no-QJM case only) if a in-progress segment is finalized under us ...
            // no need to throw an exception back to the client in this case
        } catch (FileNotFoundException e) {
            LOG.debug("Tried to read from deleted or moved edit log segment", e);
            return null;
        } catch (TransferFsImage.HttpGetFailedException e) {
            LOG.debug("Tried to read from deleted edit log segment", e);
            return null;
        }
    }

    @Override // ClientProtocol
    public EventBatchList getEditsFromTxid(long txid) throws IOException {
        checkNNStartup();
        namesystem.checkOperation(OperationCategory.READ); // only active
        namesystem.checkSuperuserPrivilege();
        int maxEventsPerRPC = nn.conf.getInt(
                DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
                DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
        FSEditLog log = namesystem.getFSImage().getEditLog();
        long syncTxid = log.getSyncTxId();
        // If we haven't synced anything yet, we can only read finalized
        // segments since we can't reliably determine which txns in in-progress
        // segments have actually been committed (e.g. written to a quorum of JNs).
        // If we have synced txns, we can definitely read up to syncTxid since
        // syncTxid is only updated after a transaction is committed to all
        // journals. (In-progress segments written by old writers are already
        // discarded for us, so if we read any in-progress segments they are
        // guaranteed to have been written by this NameNode.)
        boolean readInProgress = syncTxid > 0;

        List<EventBatch> batches = Lists.newArrayList();
        int totalEvents = 0;
        long maxSeenTxid = -1;
        long firstSeenTxid = -1;

        if (syncTxid > 0 && txid > syncTxid) {
            // we can't read past syncTxid, so there's no point in going any further
            return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
        }

        Collection<EditLogInputStream> streams = null;
        try {
            streams = log.selectInputStreams(txid, 0, null, readInProgress);
        } catch (IllegalStateException e) { // can happen if we have
            // transitioned out of active and haven't yet transitioned to standby
            // and are using QJM -- the edit log will be closed and this exception
            // will result
            LOG.info("NN is transitioning from active to standby and FSEditLog " +
                    "is closed -- could not read edits");
            return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
        }

        boolean breakOuter = false;
        for (EditLogInputStream elis : streams) {
            // our assumption in this code is the EditLogInputStreams are ordered by
            // starting txid
            try {
                FSEditLogOp op = null;
                while ((op = readOp(elis)) != null) {
                    // break out of here in the unlikely event that syncTxid is so
                    // out of date that its segment has already been deleted, so the first
                    // txid we get is greater than syncTxid
                    if (syncTxid > 0 && op.getTransactionId() > syncTxid) {
                        breakOuter = true;
                        break;
                    }

                    EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
                    if (eventBatch != null) {
                        batches.add(eventBatch);
                        totalEvents += eventBatch.getEvents().length;
                    }
                    if (op.getTransactionId() > maxSeenTxid) {
                        maxSeenTxid = op.getTransactionId();
                    }
                    if (firstSeenTxid == -1) {
                        firstSeenTxid = op.getTransactionId();
                    }
                    if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
                            op.getTransactionId() == syncTxid)) {
                        // we're done
                        breakOuter = true;
                        break;
                    }
                }
            } finally {
                elis.close();
            }
            if (breakOuter) {
                break;
            }
        }

        return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
    }

    @Override // TraceAdminProtocol
    public SpanReceiverInfo[] listSpanReceivers() throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return nn.spanReceiverHost.listSpanReceivers();
    }

    @Override // TraceAdminProtocol
    public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        return nn.spanReceiverHost.addSpanReceiver(info);
    }

    @Override // TraceAdminProtocol
    public void removeSpanReceiver(long id) throws IOException {
        checkNNStartup();
        namesystem.checkSuperuserPrivilege();
        nn.spanReceiverHost.removeSpanReceiver(id);
    }
}
